lance_encoding/encodings/physical/
binary.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Basic encodings for variable width data
5//!
6//! These are not compression but represent the "leaf" encodings for variable length data
7//! where we simply match the data with the rules of the structural encoding.
8//!
9//! These encodings are transparent since we aren't actually doing any compression.  No information
10//! is needed in the encoding description.
11
12use arrow_array::OffsetSizeTrait;
13use bytemuck::cast_slice;
14use byteorder::{ByteOrder, LittleEndian};
15use core::panic;
16use snafu::location;
17
18use crate::compression::{
19    BlockCompressor, BlockDecompressor, MiniBlockDecompressor, VariablePerValueDecompressor,
20};
21
22use crate::buffer::LanceBuffer;
23use crate::data::{BlockInfo, DataBlock, VariableWidthBlock};
24use crate::encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock};
25use crate::encodings::logical::primitive::miniblock::{
26    MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor,
27};
28use crate::format::pb21::compressive_encoding::Compression;
29use crate::format::pb21::CompressiveEncoding;
30use crate::format::{pb21, ProtobufUtils21};
31
32use lance_core::utils::bit::pad_bytes_to;
33use lance_core::{Error, Result};
34
35#[derive(Debug, Default)]
36pub struct BinaryMiniBlockEncoder {}
37
38const AIM_MINICHUNK_SIZE: i64 = 4 * 1024;
39
40// Make it to support both u32 and u64
41fn chunk_offsets<N: OffsetSizeTrait>(
42    offsets: &[N],
43    data: &[u8],
44    alignment: usize,
45) -> (Vec<LanceBuffer>, Vec<MiniBlockChunk>) {
46    #[derive(Debug)]
47    struct ChunkInfo {
48        chunk_start_offset_in_orig_idx: usize,
49        chunk_last_offset_in_orig_idx: usize,
50        // the bytes in every chunk starts at `chunk.bytes_start_offset`
51        bytes_start_offset: usize,
52        // every chunk is padded to 8 bytes.
53        // we need to interpret every chunk as &[u32] so we need it to padded at least to 4 bytes,
54        // this field can actually be eliminated and I can use `num_bytes` in `MiniBlockChunk` to compute
55        // the `output_total_bytes`.
56        padded_chunk_size: usize,
57    }
58
59    let byte_width: usize = N::get_byte_width();
60    let mut chunks_info = vec![];
61    let mut chunks = vec![];
62    let mut last_offset_in_orig_idx = 0;
63    loop {
64        let this_last_offset_in_orig_idx = search_next_offset_idx(offsets, last_offset_in_orig_idx);
65
66        let num_values_in_this_chunk = this_last_offset_in_orig_idx - last_offset_in_orig_idx;
67        let chunk_bytes = offsets[this_last_offset_in_orig_idx] - offsets[last_offset_in_orig_idx];
68        let this_chunk_size =
69            (num_values_in_this_chunk + 1) * byte_width + chunk_bytes.to_usize().unwrap();
70
71        let padded_chunk_size = this_chunk_size.next_multiple_of(alignment);
72
73        let this_chunk_bytes_start_offset = (num_values_in_this_chunk + 1) * byte_width;
74        chunks_info.push(ChunkInfo {
75            chunk_start_offset_in_orig_idx: last_offset_in_orig_idx,
76            chunk_last_offset_in_orig_idx: this_last_offset_in_orig_idx,
77            bytes_start_offset: this_chunk_bytes_start_offset,
78            padded_chunk_size,
79        });
80        chunks.push(MiniBlockChunk {
81            log_num_values: if this_last_offset_in_orig_idx == offsets.len() - 1 {
82                0
83            } else {
84                num_values_in_this_chunk.trailing_zeros() as u8
85            },
86            buffer_sizes: vec![padded_chunk_size as u16],
87        });
88        if this_last_offset_in_orig_idx == offsets.len() - 1 {
89            break;
90        }
91        last_offset_in_orig_idx = this_last_offset_in_orig_idx;
92    }
93
94    let output_total_bytes = chunks_info
95        .iter()
96        .map(|chunk_info| chunk_info.padded_chunk_size)
97        .sum::<usize>();
98
99    let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
100
101    for chunk in chunks_info {
102        let this_chunk_offsets: Vec<N> = offsets
103            [chunk.chunk_start_offset_in_orig_idx..=chunk.chunk_last_offset_in_orig_idx]
104            .iter()
105            .map(|offset| {
106                *offset - offsets[chunk.chunk_start_offset_in_orig_idx]
107                    + N::from_usize(chunk.bytes_start_offset).unwrap()
108            })
109            .collect();
110
111        let this_chunk_offsets = LanceBuffer::reinterpret_vec(this_chunk_offsets);
112        output.extend_from_slice(&this_chunk_offsets);
113
114        let start_in_orig = offsets[chunk.chunk_start_offset_in_orig_idx]
115            .to_usize()
116            .unwrap();
117        let end_in_orig = offsets[chunk.chunk_last_offset_in_orig_idx]
118            .to_usize()
119            .unwrap();
120        output.extend_from_slice(&data[start_in_orig..end_in_orig]);
121
122        // pad this chunk to make it align to desired bytes.
123        const PAD_BYTE: u8 = 72;
124        let pad_len = pad_bytes_to(output.len(), alignment);
125
126        // Compare with usize literal to avoid type mismatch with N
127        if pad_len > 0_usize {
128            output.extend(std::iter::repeat_n(PAD_BYTE, pad_len));
129        }
130    }
131    (vec![LanceBuffer::reinterpret_vec(output)], chunks)
132}
133
134// search for the next offset index to cut the values into a chunk.
135// this function incrementally peek the number of values in a chunk,
136// each time multiplies the number of values by 2.
137// It returns the offset_idx in `offsets` that belongs to this chunk.
138fn search_next_offset_idx<N: OffsetSizeTrait>(offsets: &[N], last_offset_idx: usize) -> usize {
139    let mut num_values = 1;
140    let mut new_num_values = num_values * 2;
141    loop {
142        if last_offset_idx + new_num_values >= offsets.len() {
143            let existing_bytes = offsets[offsets.len() - 1] - offsets[last_offset_idx];
144            // existing bytes plus the new offset size
145            let new_size = existing_bytes
146                + N::from_usize((offsets.len() - last_offset_idx) * N::get_byte_width()).unwrap();
147            if new_size.to_i64().unwrap() <= AIM_MINICHUNK_SIZE {
148                // case 1: can fit the rest of all data into a miniblock
149                return offsets.len() - 1;
150            } else {
151                // case 2: can only fit the last tried `num_values` into a miniblock
152                return last_offset_idx + num_values;
153            }
154        }
155        let existing_bytes = offsets[last_offset_idx + new_num_values] - offsets[last_offset_idx];
156        let new_size =
157            existing_bytes + N::from_usize((new_num_values + 1) * N::get_byte_width()).unwrap();
158        if new_size.to_i64().unwrap() <= AIM_MINICHUNK_SIZE {
159            num_values = new_num_values;
160            new_num_values *= 2;
161        } else {
162            break;
163        }
164    }
165    last_offset_idx + new_num_values
166}
167
168impl BinaryMiniBlockEncoder {
169    // put binary data into chunks, every chunk is less than or equal to `AIM_MINICHUNK_SIZE`.
170    // In each chunk, offsets are put first then followed by binary bytes data, each chunk is padded to 8 bytes.
171    // the offsets in the chunk points to the bytes offset in this chunk.
172    fn chunk_data(&self, data: VariableWidthBlock) -> (MiniBlockCompressed, CompressiveEncoding) {
173        // TODO: Support compression of offsets
174        // TODO: Support general compression of data
175        match data.bits_per_offset {
176            32 => {
177                let offsets = data.offsets.borrow_to_typed_slice::<i32>();
178                let (buffers, chunks) = chunk_offsets(offsets.as_ref(), &data.data, 4);
179                (
180                    MiniBlockCompressed {
181                        data: buffers,
182                        chunks,
183                        num_values: data.num_values,
184                    },
185                    ProtobufUtils21::variable(ProtobufUtils21::flat(32, None), None),
186                )
187            }
188            64 => {
189                let offsets = data.offsets.borrow_to_typed_slice::<i64>();
190                let (buffers, chunks) = chunk_offsets(offsets.as_ref(), &data.data, 8);
191                (
192                    MiniBlockCompressed {
193                        data: buffers,
194                        chunks,
195                        num_values: data.num_values,
196                    },
197                    ProtobufUtils21::variable(ProtobufUtils21::flat(64, None), None),
198                )
199            }
200            _ => panic!("Unsupported bits_per_offset={}", data.bits_per_offset),
201        }
202    }
203}
204
205impl MiniBlockCompressor for BinaryMiniBlockEncoder {
206    fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
207        match data {
208            DataBlock::VariableWidth(variable_width) => Ok(self.chunk_data(variable_width)),
209            _ => Err(Error::InvalidInput {
210                source: format!(
211                    "Cannot compress a data block of type {} with BinaryMiniBlockEncoder",
212                    data.name()
213                )
214                .into(),
215                location: location!(),
216            }),
217        }
218    }
219}
220
221#[derive(Debug)]
222pub struct BinaryMiniBlockDecompressor {
223    bits_per_offset: u8,
224}
225
226impl BinaryMiniBlockDecompressor {
227    pub fn new(bits_per_offset: u8) -> Self {
228        assert!(bits_per_offset == 32 || bits_per_offset == 64);
229        Self { bits_per_offset }
230    }
231
232    pub fn from_variable(variable: &pb21::Variable) -> Self {
233        if let Compression::Flat(flat) = variable
234            .offsets
235            .as_ref()
236            .unwrap()
237            .compression
238            .as_ref()
239            .unwrap()
240        {
241            Self {
242                bits_per_offset: flat.bits_per_value as u8,
243            }
244        } else {
245            panic!("Unsupported offsets compression: {:?}", variable.offsets);
246        }
247    }
248}
249
250impl MiniBlockDecompressor for BinaryMiniBlockDecompressor {
251    // decompress a MiniBlock of binary data, the num_values must be less than or equal
252    // to the number of values this MiniBlock has, BinaryMiniBlock doesn't store `the number of values`
253    // it has so assertion can not be done here and the caller of `decompress` must ensure
254    // `num_values` <= number of values in the chunk.
255    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
256        assert_eq!(data.len(), 1);
257        let data = data.into_iter().next().unwrap();
258
259        if self.bits_per_offset == 64 {
260            // offset and at least one value
261            assert!(data.len() >= 16);
262
263            let offsets_buffer = data.borrow_to_typed_slice::<u64>();
264            let offsets = offsets_buffer.as_ref();
265
266            let result_offsets = offsets[0..(num_values + 1) as usize]
267                .iter()
268                .map(|offset| offset - offsets[0])
269                .collect::<Vec<u64>>();
270
271            Ok(DataBlock::VariableWidth(VariableWidthBlock {
272                data: LanceBuffer::from(
273                    data[offsets[0] as usize..offsets[num_values as usize] as usize].to_vec(),
274                ),
275                offsets: LanceBuffer::reinterpret_vec(result_offsets),
276                bits_per_offset: 64,
277                num_values,
278                block_info: BlockInfo::new(),
279            }))
280        } else {
281            // offset and at least one value
282            assert!(data.len() >= 8);
283
284            let offsets_buffer = data.borrow_to_typed_slice::<u32>();
285            let offsets = offsets_buffer.as_ref();
286
287            let result_offsets = offsets[0..(num_values + 1) as usize]
288                .iter()
289                .map(|offset| offset - offsets[0])
290                .collect::<Vec<u32>>();
291
292            Ok(DataBlock::VariableWidth(VariableWidthBlock {
293                data: LanceBuffer::from(
294                    data[offsets[0] as usize..offsets[num_values as usize] as usize].to_vec(),
295                ),
296                offsets: LanceBuffer::reinterpret_vec(result_offsets),
297                bits_per_offset: 32,
298                num_values,
299                block_info: BlockInfo::new(),
300            }))
301        }
302    }
303}
304
305/// Most basic encoding for variable-width data which does no compression at all
306/// The DataBlock memory layout looks like below:
307/// ----------------------------------------------------------------------------------------
308/// | bits_per_offset | number of values  | bytes_start_offset | offsets data | bytes data
309/// ----------------------------------------------------------------------------------------
310/// |       1 byte    |<bits_per_offset>/8|<bits_per_offset>/8 |  offsets_len | dat_len
311/// ----------------------------------------------------------------------------------------
312/// It's used in VariableEncoder and BinaryBlockDecompressor
313///
314#[derive(Debug, Default)]
315pub struct VariableEncoder {}
316
317impl BlockCompressor for VariableEncoder {
318    fn compress(&self, mut data: DataBlock) -> Result<LanceBuffer> {
319        match data {
320            DataBlock::VariableWidth(ref mut variable_width_data) => {
321                let num_values: u64 = variable_width_data.num_values;
322                match variable_width_data.bits_per_offset {
323                    32 => {
324                        let num_values: u32 = num_values
325            .try_into()
326            .expect("The Maximum number of values BinaryBlockEncoder can work with is u32::MAX");
327
328                        let offsets = variable_width_data.offsets.borrow_to_typed_slice::<u32>();
329                        let offsets = offsets.as_ref();
330                        // the first bit stores the bits_per_offset, the next 4 bytes store the number of values,
331                        // then 4 bytes for bytes_start_offset,
332                        // then offsets data, then bytes data.
333                        let bytes_start_offset = 1 + 4 + 4 + std::mem::size_of_val(offsets) as u32;
334
335                        let output_total_bytes =
336                            bytes_start_offset as usize + variable_width_data.data.len();
337                        let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
338
339                        // Store bit_per_offset info
340                        output.push(32_u8);
341
342                        // store `num_values` in the first 4 bytes of output buffer
343                        output.extend_from_slice(&(num_values).to_le_bytes());
344
345                        // store `bytes_start_offset` in the next 4 bytes of output buffer
346                        output.extend_from_slice(&(bytes_start_offset).to_le_bytes());
347
348                        // store offsets
349                        output.extend_from_slice(cast_slice(offsets));
350
351                        // store bytes
352                        output.extend_from_slice(&variable_width_data.data);
353                        Ok(LanceBuffer::from(output))
354                    }
355                    64 => {
356                        let offsets = variable_width_data.offsets.borrow_to_typed_slice::<u64>();
357                        let offsets = offsets.as_ref();
358                        // the first bit stores the bits_per_offset, the next 8 bytes store the number of values,
359                        // then 8 bytes for bytes_start_offset,
360                        // then offsets data, then bytes data.
361
362                        let bytes_start_offset = 1 + 8 + 8 + std::mem::size_of_val(offsets) as u64;
363
364                        let output_total_bytes =
365                            bytes_start_offset as usize + variable_width_data.data.len();
366                        let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
367
368                        // Store bit_per_offset info
369                        output.push(64_u8);
370
371                        // store `num_values` in the first 8 bytes of output buffer
372                        output.extend_from_slice(&(num_values).to_le_bytes());
373
374                        // store `bytes_start_offset` in the next 8 bytes of output buffer
375                        output.extend_from_slice(&(bytes_start_offset).to_le_bytes());
376
377                        // store offsets
378                        output.extend_from_slice(cast_slice(offsets));
379
380                        // store bytes
381                        output.extend_from_slice(&variable_width_data.data);
382                        Ok(LanceBuffer::from(output))
383                    }
384                    _ => {
385                        panic!("BinaryBlockEncoder does not work with {} bits per offset VariableWidth DataBlock.",
386                variable_width_data.bits_per_offset);
387                    }
388                }
389            }
390            _ => {
391                panic!("BinaryBlockEncoder can only work with Variable Width DataBlock.");
392            }
393        }
394    }
395}
396
397impl PerValueCompressor for VariableEncoder {
398    fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
399        let DataBlock::VariableWidth(variable) = data else {
400            panic!("BinaryPerValueCompressor can only work with Variable Width DataBlock.");
401        };
402
403        let encoding = ProtobufUtils21::variable(
404            ProtobufUtils21::flat(variable.bits_per_offset as u64, None),
405            None,
406        );
407        Ok((PerValueDataBlock::Variable(variable), encoding))
408    }
409}
410
411#[derive(Debug, Default)]
412pub struct VariableDecoder {}
413
414impl VariablePerValueDecompressor for VariableDecoder {
415    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
416        Ok(DataBlock::VariableWidth(data))
417    }
418}
419
420#[derive(Debug, Default)]
421pub struct BinaryBlockDecompressor {}
422
423impl BlockDecompressor for BinaryBlockDecompressor {
424    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
425        // The first byte contains bytes_per_offset info
426        let bits_per_offset = data[0];
427        match bits_per_offset {
428            32 => {
429                // the next 4 bytes in the BinaryBlock compressed buffer stores the num_values this block has.
430                let stored_num_values = LittleEndian::read_u32(&data[1..5]);
431                debug_assert_eq!(num_values, stored_num_values as u64);
432
433                // the next 4 bytes in the BinaryBlock compressed buffer stores the bytes_start_offset.
434                let bytes_start_offset = LittleEndian::read_u32(&data[5..9]);
435
436                // the next `bytes_start_offset - 9` stores the offsets.
437                let offsets = data.slice_with_length(9, bytes_start_offset as usize - 9);
438
439                // the rest are the binary bytes.
440                let data = data.slice_with_length(
441                    bytes_start_offset as usize,
442                    data.len() - bytes_start_offset as usize,
443                );
444
445                Ok(DataBlock::VariableWidth(VariableWidthBlock {
446                    data,
447                    offsets,
448                    bits_per_offset: 32,
449                    num_values,
450                    block_info: BlockInfo::new(),
451                }))
452            }
453            64 => {
454                // the next 8 bytes in the BinaryBlock compressed buffer stores the num_values this block has.
455                let stored_num_values = LittleEndian::read_u64(&data[1..9]);
456                debug_assert_eq!(num_values, stored_num_values);
457
458                // the next 8 bytes in the BinaryBlock compressed buffer stores the bytes_start_offset.
459                let bytes_start_offset = LittleEndian::read_u64(&data[9..17]);
460
461                // the next `bytes_start_offset - 17` stores the offsets.
462                let offsets = data.slice_with_length(17, bytes_start_offset as usize - 17);
463
464                // the rest are the binary bytes.
465                let data = data.slice_with_length(
466                    bytes_start_offset as usize,
467                    data.len() - bytes_start_offset as usize,
468                );
469
470                Ok(DataBlock::VariableWidth(VariableWidthBlock {
471                    data,
472                    offsets,
473                    bits_per_offset: 64,
474                    num_values,
475                    block_info: BlockInfo::new(),
476                }))
477            }
478            _ => panic!("Unsupported bits_per_offset={}", bits_per_offset),
479        }
480    }
481}
482
483#[cfg(test)]
484pub mod tests {
485    use arrow_array::{
486        builder::{LargeStringBuilder, StringBuilder},
487        ArrayRef, StringArray,
488    };
489    use arrow_schema::{DataType, Field};
490
491    use crate::constants::{
492        COMPRESSION_META_KEY, STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY,
493        STRUCTURAL_ENCODING_MINIBLOCK,
494    };
495    use rstest::rstest;
496    use std::{collections::HashMap, sync::Arc, vec};
497
498    use crate::{
499        testing::{
500            check_round_trip_encoding_generated, check_round_trip_encoding_of_data,
501            check_round_trip_encoding_random, FnArrayGeneratorProvider, TestCases,
502        },
503        version::LanceFileVersion,
504    };
505
506    #[rstest]
507    #[test_log::test(tokio::test)]
508    async fn test_utf8_binary(
509        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
510    ) {
511        let field = Field::new("", DataType::Utf8, false);
512        check_round_trip_encoding_random(field, version).await;
513    }
514
515    #[rstest]
516    #[test_log::test(tokio::test)]
517    async fn test_binary(
518        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
519        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
520        structural_encoding: &str,
521        #[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
522    ) {
523        let mut field_metadata = HashMap::new();
524        field_metadata.insert(
525            STRUCTURAL_ENCODING_META_KEY.to_string(),
526            structural_encoding.into(),
527        );
528
529        let field = Field::new("", data_type, false).with_metadata(field_metadata);
530        check_round_trip_encoding_random(field, version).await;
531    }
532
533    #[rstest]
534    #[test_log::test(tokio::test)]
535    async fn test_binary_fsst(
536        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
537        structural_encoding: &str,
538        #[values(DataType::Binary, DataType::Utf8)] data_type: DataType,
539    ) {
540        let mut field_metadata = HashMap::new();
541        field_metadata.insert(
542            STRUCTURAL_ENCODING_META_KEY.to_string(),
543            structural_encoding.into(),
544        );
545        field_metadata.insert(COMPRESSION_META_KEY.to_string(), "fsst".into());
546        let field = Field::new("", data_type, true).with_metadata(field_metadata);
547        check_round_trip_encoding_random(field, LanceFileVersion::V2_1).await;
548    }
549
550    #[rstest]
551    #[test_log::test(tokio::test)]
552    async fn test_large_binary_fsst(
553        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
554        structural_encoding: &str,
555        #[values(DataType::LargeBinary, DataType::LargeUtf8)] data_type: DataType,
556    ) {
557        let mut field_metadata = HashMap::new();
558        field_metadata.insert(
559            STRUCTURAL_ENCODING_META_KEY.to_string(),
560            structural_encoding.into(),
561        );
562        field_metadata.insert(COMPRESSION_META_KEY.to_string(), "fsst".into());
563        let field = Field::new("", data_type, true).with_metadata(field_metadata);
564        check_round_trip_encoding_random(field, LanceFileVersion::V2_1).await;
565    }
566
567    #[rstest]
568    #[test_log::test(tokio::test)]
569    async fn test_large_binary(
570        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
571    ) {
572        let field = Field::new("", DataType::LargeBinary, true);
573        check_round_trip_encoding_random(field, version).await;
574    }
575
576    #[test_log::test(tokio::test)]
577    async fn test_large_utf8() {
578        let field = Field::new("", DataType::LargeUtf8, true);
579        check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
580    }
581
582    #[rstest]
583    #[test_log::test(tokio::test)]
584    async fn test_small_strings(
585        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
586        structural_encoding: &str,
587    ) {
588        let mut field_metadata = HashMap::new();
589        field_metadata.insert(
590            STRUCTURAL_ENCODING_META_KEY.to_string(),
591            structural_encoding.into(),
592        );
593        let field = Field::new("", DataType::Utf8, true).with_metadata(field_metadata);
594        check_round_trip_encoding_generated(
595            field,
596            Box::new(FnArrayGeneratorProvider::new(move || {
597                lance_datagen::array::utf8_prefix_plus_counter("user_", /*is_large=*/ false)
598            })),
599            LanceFileVersion::V2_1,
600        )
601        .await;
602    }
603
604    #[rstest]
605    #[test_log::test(tokio::test)]
606    async fn test_simple_binary(
607        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
608        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
609        structural_encoding: &str,
610        #[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
611    ) {
612        let string_array = StringArray::from(vec![Some("abc"), None, Some("pqr"), None, Some("m")]);
613        let string_array = arrow_cast::cast(&string_array, &data_type).unwrap();
614
615        let mut field_metadata = HashMap::new();
616        field_metadata.insert(
617            STRUCTURAL_ENCODING_META_KEY.to_string(),
618            structural_encoding.into(),
619        );
620
621        let test_cases = TestCases::default()
622            .with_range(0..2)
623            .with_range(0..3)
624            .with_range(1..3)
625            .with_indices(vec![0, 1, 3, 4])
626            .with_file_version(version);
627        check_round_trip_encoding_of_data(
628            vec![Arc::new(string_array)],
629            &test_cases,
630            field_metadata,
631        )
632        .await;
633    }
634
635    #[rstest]
636    #[test_log::test(tokio::test)]
637    async fn test_sliced_utf8(
638        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
639    ) {
640        let string_array = StringArray::from(vec![Some("abc"), Some("de"), None, Some("fgh")]);
641        let string_array = string_array.slice(1, 3);
642
643        let test_cases = TestCases::default()
644            .with_range(0..1)
645            .with_range(0..2)
646            .with_range(1..2)
647            .with_file_version(version);
648        check_round_trip_encoding_of_data(
649            vec![Arc::new(string_array)],
650            &test_cases,
651            HashMap::new(),
652        )
653        .await;
654    }
655
656    #[test_log::test(tokio::test)]
657    async fn test_bigger_than_max_page_size() {
658        // Create an array with one single 32MiB string
659        let big_string = String::from_iter((0..(32 * 1024 * 1024)).map(|_| '0'));
660        let string_array = StringArray::from(vec![
661            Some(big_string),
662            Some("abc".to_string()),
663            None,
664            None,
665            Some("xyz".to_string()),
666        ]);
667
668        // Drop the max page size to 1MiB
669        let test_cases = TestCases::default().with_max_page_size(1024 * 1024);
670
671        check_round_trip_encoding_of_data(
672            vec![Arc::new(string_array)],
673            &test_cases,
674            HashMap::new(),
675        )
676        .await;
677
678        // This is a regression testing the case where a page with X rows is split into Y parts
679        // where the number of parts is not evenly divisible by the number of rows.  In this
680        // case we are splitting 90 rows into 4 parts.
681        let big_string = String::from_iter((0..(1000 * 1000)).map(|_| '0'));
682        let string_array = StringArray::from_iter_values((0..90).map(|_| big_string.clone()));
683
684        check_round_trip_encoding_of_data(
685            vec![Arc::new(string_array)],
686            &TestCases::default(),
687            HashMap::new(),
688        )
689        .await;
690    }
691
692    #[rstest]
693    #[test_log::test(tokio::test)]
694    async fn test_empty_strings(
695        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
696    ) {
697        // Scenario 1: Some strings are empty
698
699        let values = [Some("abc"), Some(""), None];
700        // Test empty list at beginning, middle, and end
701        for order in [[0, 1, 2], [1, 0, 2], [2, 0, 1]] {
702            let mut string_builder = StringBuilder::new();
703            for idx in order {
704                string_builder.append_option(values[idx]);
705            }
706            let string_array = Arc::new(string_builder.finish());
707            let test_cases = TestCases::default()
708                .with_indices(vec![1])
709                .with_indices(vec![0])
710                .with_indices(vec![2])
711                .with_indices(vec![0, 1])
712                .with_file_version(version);
713            check_round_trip_encoding_of_data(
714                vec![string_array.clone()],
715                &test_cases,
716                HashMap::new(),
717            )
718            .await;
719            let test_cases = test_cases.with_batch_size(1);
720            check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new())
721                .await;
722        }
723
724        // Scenario 2: All strings are empty
725
726        // When encoding an array of empty strings there are no bytes to encode
727        // which is strange and we want to ensure we handle it
728        let string_array = Arc::new(StringArray::from(vec![Some(""), None, Some("")]));
729
730        let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
731        check_round_trip_encoding_of_data(vec![string_array.clone()], &test_cases, HashMap::new())
732            .await;
733        let test_cases = test_cases.with_batch_size(1);
734        check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
735    }
736
737    #[rstest]
738    #[test_log::test(tokio::test)]
739    #[ignore] // This test is quite slow in debug mode
740    async fn test_jumbo_string(
741        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
742    ) {
743        // This is an overflow test.  We have a list of lists where each list
744        // has 1Mi items.  We encode 5000 of these lists and so we have over 4Gi in the
745        // offsets range
746        let mut string_builder = LargeStringBuilder::new();
747        // a 1 MiB string
748        let giant_string = String::from_iter((0..(1024 * 1024)).map(|_| '0'));
749        for _ in 0..5000 {
750            string_builder.append_option(Some(&giant_string));
751        }
752        let giant_array = Arc::new(string_builder.finish()) as ArrayRef;
753        let arrs = vec![giant_array];
754
755        // // We can't validate because our validation relies on concatenating all input arrays
756        let test_cases = TestCases::default()
757            .without_validation()
758            .with_file_version(version);
759        check_round_trip_encoding_of_data(arrs, &test_cases, HashMap::new()).await;
760    }
761
762    #[rstest]
763    #[test_log::test(tokio::test)]
764    async fn test_binary_dictionary_encoding(
765        #[values(true, false)] with_nulls: bool,
766        #[values(100, 500, 35000)] dict_size: u32,
767    ) {
768        let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
769        let strings = (0..dict_size)
770            .map(|i| i.to_string())
771            .collect::<Vec<String>>();
772
773        let repeated_strings: Vec<_> = strings
774            .iter()
775            .cycle()
776            .take(70000)
777            .enumerate()
778            .map(|(i, s)| {
779                if with_nulls && i % 7 == 0 {
780                    None
781                } else {
782                    Some(s.clone())
783                }
784            })
785            .collect();
786        let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
787        check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
788    }
789
790    #[test_log::test(tokio::test)]
791    async fn test_binary_encoding_verification() {
792        use lance_datagen::{ByteCount, RowCount};
793
794        let test_cases = TestCases::default()
795            .with_expected_encoding("variable")
796            .with_file_version(LanceFileVersion::V2_1);
797
798        // Test both automatic selection and explicit configuration
799        // 1. Test automatic binary encoding selection (small strings that won't trigger FSST)
800        let arr_small = lance_datagen::gen_batch()
801            .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(10), false))
802            .into_batch_rows(RowCount::from(1000))
803            .unwrap()
804            .column(0)
805            .clone();
806        check_round_trip_encoding_of_data(vec![arr_small], &test_cases, HashMap::new()).await;
807
808        // 2. Test explicit "none" compression to force binary encoding
809        let metadata_explicit =
810            HashMap::from([("lance-encoding:compression".to_string(), "none".to_string())]);
811        let arr_large = lance_datagen::gen_batch()
812            .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(50), false))
813            .into_batch_rows(RowCount::from(2000))
814            .unwrap()
815            .column(0)
816            .clone();
817        check_round_trip_encoding_of_data(vec![arr_large], &test_cases, metadata_explicit).await;
818    }
819
820    #[test]
821    fn test_binary_miniblock_with_misaligned_buffer() {
822        use super::BinaryMiniBlockDecompressor;
823        use crate::buffer::LanceBuffer;
824        use crate::compression::MiniBlockDecompressor;
825        use crate::data::DataBlock;
826
827        // Test case 1: u32 offsets
828        {
829            let decompressor = BinaryMiniBlockDecompressor {
830                bits_per_offset: 32,
831            };
832
833            // Create test data with u32 offsets
834            // BinaryMiniBlock format: all offsets followed by all string data
835            // Need to ensure total size is divisible by 4 for u32
836            let mut test_data = Vec::new();
837
838            // Offsets section (3 offsets for 2 values + 1 end offset)
839            test_data.extend_from_slice(&12u32.to_le_bytes()); // offset to start of strings (after offsets)
840            test_data.extend_from_slice(&15u32.to_le_bytes()); // offset to second string
841            test_data.extend_from_slice(&20u32.to_le_bytes()); // offset to end
842
843            // String data section
844            test_data.extend_from_slice(b"ABCXYZ"); // 6 bytes of string data
845            test_data.extend_from_slice(&[0, 0]); // 2 bytes padding to make total 20 bytes (divisible by 4)
846
847            // Create a misaligned buffer by adding padding and slicing
848            let mut padded = Vec::with_capacity(test_data.len() + 1);
849            padded.push(0xFF); // Padding byte to misalign
850            padded.extend_from_slice(&test_data);
851
852            let bytes = bytes::Bytes::from(padded);
853            let misaligned = bytes.slice(1..); // Skip first byte to create misalignment
854
855            // Create LanceBuffer with bytes_per_value=1 to bypass alignment check
856            let buffer = LanceBuffer::from_bytes(misaligned, 1);
857
858            // Verify the buffer is actually misaligned
859            let ptr = buffer.as_ref().as_ptr();
860            assert_ne!(
861                ptr.align_offset(4),
862                0,
863                "Test setup: buffer should be misaligned for u32"
864            );
865
866            // Decompress with misaligned buffer - should work with borrow_to_typed_slice
867            let result = decompressor.decompress(vec![buffer], 2);
868            assert!(
869                result.is_ok(),
870                "Decompression should succeed with misaligned buffer"
871            );
872
873            // Verify the data is correct
874            if let Ok(DataBlock::VariableWidth(block)) = result {
875                assert_eq!(block.num_values, 2);
876                // Data should be the strings (including padding from the original buffer)
877                assert_eq!(&block.data.as_ref()[..6], b"ABCXYZ");
878            } else {
879                panic!("Expected VariableWidth block");
880            }
881        }
882
883        // Test case 2: u64 offsets
884        {
885            let decompressor = BinaryMiniBlockDecompressor {
886                bits_per_offset: 64,
887            };
888
889            // Create test data with u64 offsets
890            let mut test_data = Vec::new();
891
892            // Offsets section (3 offsets for 2 values + 1 end offset)
893            test_data.extend_from_slice(&24u64.to_le_bytes()); // offset to start of strings (after offsets)
894            test_data.extend_from_slice(&29u64.to_le_bytes()); // offset to second string
895            test_data.extend_from_slice(&40u64.to_le_bytes()); // offset to end (divisible by 8)
896
897            // String data section
898            test_data.extend_from_slice(b"HelloWorld"); // 10 bytes of string data
899            test_data.extend_from_slice(&[0, 0, 0, 0, 0, 0]); // 6 bytes padding to make total 40 bytes (divisible by 8)
900
901            // Create misaligned buffer
902            let mut padded = Vec::with_capacity(test_data.len() + 3);
903            padded.extend_from_slice(&[0xFF, 0xFF, 0xFF]); // 3 bytes padding for misalignment
904            padded.extend_from_slice(&test_data);
905
906            let bytes = bytes::Bytes::from(padded);
907            let misaligned = bytes.slice(3..); // Skip 3 bytes
908
909            let buffer = LanceBuffer::from_bytes(misaligned, 1);
910
911            // Verify misalignment for u64
912            let ptr = buffer.as_ref().as_ptr();
913            assert_ne!(
914                ptr.align_offset(8),
915                0,
916                "Test setup: buffer should be misaligned for u64"
917            );
918
919            // Decompress should succeed
920            let result = decompressor.decompress(vec![buffer], 2);
921            assert!(
922                result.is_ok(),
923                "Decompression should succeed with misaligned u64 buffer"
924            );
925
926            if let Ok(DataBlock::VariableWidth(block)) = result {
927                assert_eq!(block.num_values, 2);
928                // Data should be the strings (including padding from the original buffer)
929                assert_eq!(&block.data.as_ref()[..10], b"HelloWorld");
930            } else {
931                panic!("Expected VariableWidth block");
932            }
933        }
934    }
935}