lance_encoding/encodings/physical/
binary.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Basic encodings for variable width data
5//!
6//! These are not compression but represent the "leaf" encodings for variable length data
7//! where we simply match the data with the rules of the structural encoding.
8//!
9//! These encodings are transparent since we aren't actually doing any compression.  No information
10//! is needed in the encoding description.
11
12use arrow_array::OffsetSizeTrait;
13use byteorder::{ByteOrder, LittleEndian};
14use core::panic;
15use snafu::location;
16
17use crate::compression::{
18    BlockCompressor, BlockDecompressor, MiniBlockDecompressor, VariablePerValueDecompressor,
19};
20
21use crate::buffer::LanceBuffer;
22use crate::data::{BlockInfo, DataBlock, VariableWidthBlock};
23use crate::encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock};
24use crate::encodings::logical::primitive::miniblock::{
25    MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor,
26};
27use crate::format::pb21::compressive_encoding::Compression;
28use crate::format::pb21::CompressiveEncoding;
29use crate::format::{pb21, ProtobufUtils21};
30
31use lance_core::utils::bit::pad_bytes_to;
32use lance_core::{Error, Result};
33
34#[derive(Debug, Default)]
35pub struct BinaryMiniBlockEncoder {}
36
37const AIM_MINICHUNK_SIZE: i64 = 4 * 1024;
38
39// Make it to support both u32 and u64
40fn chunk_offsets<N: OffsetSizeTrait>(
41    offsets: &[N],
42    data: &[u8],
43    alignment: usize,
44) -> (Vec<LanceBuffer>, Vec<MiniBlockChunk>) {
45    #[derive(Debug)]
46    struct ChunkInfo {
47        chunk_start_offset_in_orig_idx: usize,
48        chunk_last_offset_in_orig_idx: usize,
49        // the bytes in every chunk starts at `chunk.bytes_start_offset`
50        bytes_start_offset: usize,
51        // every chunk is padded to 8 bytes.
52        // we need to interpret every chunk as &[u32] so we need it to padded at least to 4 bytes,
53        // this field can actually be eliminated and I can use `num_bytes` in `MiniBlockChunk` to compute
54        // the `output_total_bytes`.
55        padded_chunk_size: usize,
56    }
57
58    let byte_width: usize = N::get_byte_width();
59    let mut chunks_info = vec![];
60    let mut chunks = vec![];
61    let mut last_offset_in_orig_idx = 0;
62    loop {
63        let this_last_offset_in_orig_idx = search_next_offset_idx(offsets, last_offset_in_orig_idx);
64
65        let num_values_in_this_chunk = this_last_offset_in_orig_idx - last_offset_in_orig_idx;
66        let chunk_bytes = offsets[this_last_offset_in_orig_idx] - offsets[last_offset_in_orig_idx];
67        let this_chunk_size =
68            (num_values_in_this_chunk + 1) * byte_width + chunk_bytes.to_usize().unwrap();
69
70        let padded_chunk_size = this_chunk_size.next_multiple_of(alignment);
71        debug_assert!(padded_chunk_size > 0);
72
73        let this_chunk_bytes_start_offset = (num_values_in_this_chunk + 1) * byte_width;
74        chunks_info.push(ChunkInfo {
75            chunk_start_offset_in_orig_idx: last_offset_in_orig_idx,
76            chunk_last_offset_in_orig_idx: this_last_offset_in_orig_idx,
77            bytes_start_offset: this_chunk_bytes_start_offset,
78            padded_chunk_size,
79        });
80        chunks.push(MiniBlockChunk {
81            log_num_values: if this_last_offset_in_orig_idx == offsets.len() - 1 {
82                0
83            } else {
84                num_values_in_this_chunk.trailing_zeros() as u8
85            },
86            buffer_sizes: vec![padded_chunk_size as u16],
87        });
88        if this_last_offset_in_orig_idx == offsets.len() - 1 {
89            break;
90        }
91        last_offset_in_orig_idx = this_last_offset_in_orig_idx;
92    }
93
94    let output_total_bytes = chunks_info
95        .iter()
96        .map(|chunk_info| chunk_info.padded_chunk_size)
97        .sum::<usize>();
98
99    let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
100
101    for chunk in chunks_info {
102        let this_chunk_offsets: Vec<N> = offsets
103            [chunk.chunk_start_offset_in_orig_idx..=chunk.chunk_last_offset_in_orig_idx]
104            .iter()
105            .map(|offset| {
106                *offset - offsets[chunk.chunk_start_offset_in_orig_idx]
107                    + N::from_usize(chunk.bytes_start_offset).unwrap()
108            })
109            .collect();
110
111        let this_chunk_offsets = LanceBuffer::reinterpret_vec(this_chunk_offsets);
112        output.extend_from_slice(&this_chunk_offsets);
113
114        let start_in_orig = offsets[chunk.chunk_start_offset_in_orig_idx]
115            .to_usize()
116            .unwrap();
117        let end_in_orig = offsets[chunk.chunk_last_offset_in_orig_idx]
118            .to_usize()
119            .unwrap();
120        output.extend_from_slice(&data[start_in_orig..end_in_orig]);
121
122        // pad this chunk to make it align to desired bytes.
123        const PAD_BYTE: u8 = 72;
124        let pad_len = pad_bytes_to(output.len(), alignment);
125
126        // Compare with usize literal to avoid type mismatch with N
127        if pad_len > 0_usize {
128            output.extend(std::iter::repeat_n(PAD_BYTE, pad_len));
129        }
130    }
131    (vec![LanceBuffer::reinterpret_vec(output)], chunks)
132}
133
134// search for the next offset index to cut the values into a chunk.
135// this function incrementally peek the number of values in a chunk,
136// each time multiplies the number of values by 2.
137// It returns the offset_idx in `offsets` that belongs to this chunk.
138fn search_next_offset_idx<N: OffsetSizeTrait>(offsets: &[N], last_offset_idx: usize) -> usize {
139    let mut num_values = 1;
140    let mut new_num_values = num_values * 2;
141    loop {
142        if last_offset_idx + new_num_values >= offsets.len() {
143            let existing_bytes = offsets[offsets.len() - 1] - offsets[last_offset_idx];
144            // existing bytes plus the new offset size
145            let new_size = existing_bytes
146                + N::from_usize((offsets.len() - last_offset_idx) * N::get_byte_width()).unwrap();
147            if new_size.to_i64().unwrap() <= AIM_MINICHUNK_SIZE {
148                // case 1: can fit the rest of all data into a miniblock
149                return offsets.len() - 1;
150            } else {
151                // case 2: can only fit the last tried `num_values` into a miniblock
152                return last_offset_idx + num_values;
153            }
154        }
155        let existing_bytes = offsets[last_offset_idx + new_num_values] - offsets[last_offset_idx];
156        let new_size =
157            existing_bytes + N::from_usize((new_num_values + 1) * N::get_byte_width()).unwrap();
158        if new_size.to_i64().unwrap() <= AIM_MINICHUNK_SIZE {
159            num_values = new_num_values;
160            new_num_values *= 2;
161        } else {
162            break;
163        }
164    }
165    last_offset_idx + new_num_values
166}
167
168impl BinaryMiniBlockEncoder {
169    // put binary data into chunks, every chunk is less than or equal to `AIM_MINICHUNK_SIZE`.
170    // In each chunk, offsets are put first then followed by binary bytes data, each chunk is padded to 8 bytes.
171    // the offsets in the chunk points to the bytes offset in this chunk.
172    fn chunk_data(&self, data: VariableWidthBlock) -> (MiniBlockCompressed, CompressiveEncoding) {
173        // TODO: Support compression of offsets
174        // TODO: Support general compression of data
175        match data.bits_per_offset {
176            32 => {
177                let offsets = data.offsets.borrow_to_typed_slice::<i32>();
178                let (buffers, chunks) = chunk_offsets(offsets.as_ref(), &data.data, 4);
179                (
180                    MiniBlockCompressed {
181                        data: buffers,
182                        chunks,
183                        num_values: data.num_values,
184                    },
185                    ProtobufUtils21::variable(ProtobufUtils21::flat(32, None), None),
186                )
187            }
188            64 => {
189                let offsets = data.offsets.borrow_to_typed_slice::<i64>();
190                let (buffers, chunks) = chunk_offsets(offsets.as_ref(), &data.data, 8);
191                (
192                    MiniBlockCompressed {
193                        data: buffers,
194                        chunks,
195                        num_values: data.num_values,
196                    },
197                    ProtobufUtils21::variable(ProtobufUtils21::flat(64, None), None),
198                )
199            }
200            _ => panic!("Unsupported bits_per_offset={}", data.bits_per_offset),
201        }
202    }
203}
204
205impl MiniBlockCompressor for BinaryMiniBlockEncoder {
206    fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
207        match data {
208            DataBlock::VariableWidth(variable_width) => Ok(self.chunk_data(variable_width)),
209            _ => Err(Error::InvalidInput {
210                source: format!(
211                    "Cannot compress a data block of type {} with BinaryMiniBlockEncoder",
212                    data.name()
213                )
214                .into(),
215                location: location!(),
216            }),
217        }
218    }
219}
220
221#[derive(Debug)]
222pub struct BinaryMiniBlockDecompressor {
223    bits_per_offset: u8,
224}
225
226impl BinaryMiniBlockDecompressor {
227    pub fn new(bits_per_offset: u8) -> Self {
228        assert!(bits_per_offset == 32 || bits_per_offset == 64);
229        Self { bits_per_offset }
230    }
231
232    pub fn from_variable(variable: &pb21::Variable) -> Self {
233        if let Compression::Flat(flat) = variable
234            .offsets
235            .as_ref()
236            .unwrap()
237            .compression
238            .as_ref()
239            .unwrap()
240        {
241            Self {
242                bits_per_offset: flat.bits_per_value as u8,
243            }
244        } else {
245            panic!("Unsupported offsets compression: {:?}", variable.offsets);
246        }
247    }
248}
249
250impl MiniBlockDecompressor for BinaryMiniBlockDecompressor {
251    // decompress a MiniBlock of binary data, the num_values must be less than or equal
252    // to the number of values this MiniBlock has, BinaryMiniBlock doesn't store `the number of values`
253    // it has so assertion can not be done here and the caller of `decompress` must ensure
254    // `num_values` <= number of values in the chunk.
255    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
256        assert_eq!(data.len(), 1);
257        let data = data.into_iter().next().unwrap();
258
259        if self.bits_per_offset == 64 {
260            // offset and at least one value
261            assert!(data.len() >= 16);
262
263            let offsets_buffer = data.borrow_to_typed_slice::<u64>();
264            let offsets = offsets_buffer.as_ref();
265
266            let result_offsets = offsets[0..(num_values + 1) as usize]
267                .iter()
268                .map(|offset| offset - offsets[0])
269                .collect::<Vec<u64>>();
270
271            Ok(DataBlock::VariableWidth(VariableWidthBlock {
272                data: LanceBuffer::from(
273                    data[offsets[0] as usize..offsets[num_values as usize] as usize].to_vec(),
274                ),
275                offsets: LanceBuffer::reinterpret_vec(result_offsets),
276                bits_per_offset: 64,
277                num_values,
278                block_info: BlockInfo::new(),
279            }))
280        } else {
281            // offset and at least one value
282            assert!(data.len() >= 8);
283
284            let offsets_buffer = data.borrow_to_typed_slice::<u32>();
285            let offsets = offsets_buffer.as_ref();
286
287            let result_offsets = offsets[0..(num_values + 1) as usize]
288                .iter()
289                .map(|offset| offset - offsets[0])
290                .collect::<Vec<u32>>();
291
292            Ok(DataBlock::VariableWidth(VariableWidthBlock {
293                data: LanceBuffer::from(
294                    data[offsets[0] as usize..offsets[num_values as usize] as usize].to_vec(),
295                ),
296                offsets: LanceBuffer::reinterpret_vec(result_offsets),
297                bits_per_offset: 32,
298                num_values,
299                block_info: BlockInfo::new(),
300            }))
301        }
302    }
303}
304
305/// Most basic encoding for variable-width data which does no compression at all
306/// The DataBlock memory layout looks like below:
307///
308/// | bits_per_offset           | bytes_start_offset        | offsets data | bytes data |
309/// | ------------------------- | ------------------------- | ------------ | ---------- |
310/// | <bits_per_offset>/8 bytes | <bits_per_offset>/8 bytes | offsets_len  | data_len   |
311///
312/// It's used in VariableEncoder and BinaryBlockDecompressor
313///
314#[derive(Debug, Default)]
315pub struct VariableEncoder {}
316
317impl BlockCompressor for VariableEncoder {
318    fn compress(&self, mut data: DataBlock) -> Result<LanceBuffer> {
319        match data {
320            DataBlock::VariableWidth(ref mut variable_width_data) => {
321                match variable_width_data.bits_per_offset {
322                    32 => {
323                        let offsets = variable_width_data.offsets.borrow_to_typed_slice::<u32>();
324                        let offsets = offsets.as_ref();
325                        // The first 4 bytes store the bits per offset, the next 4 bytes store the start
326                        // offset of the bytes data, then offsets data, then bytes data.
327                        let bytes_start_offset = 4 + 4 + std::mem::size_of_val(offsets) as u32;
328
329                        let output_total_bytes =
330                            bytes_start_offset as usize + variable_width_data.data.len();
331                        let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
332
333                        // Store bit_per_offset info
334                        output.extend_from_slice(&(32_u32).to_le_bytes());
335
336                        // store `bytes_start_offset` in the next 4 bytes of output buffer
337                        output.extend_from_slice(&(bytes_start_offset).to_le_bytes());
338
339                        // store offsets
340                        output.extend_from_slice(&variable_width_data.offsets);
341
342                        // store bytes
343                        output.extend_from_slice(&variable_width_data.data);
344                        Ok(LanceBuffer::from(output))
345                    }
346                    64 => {
347                        let offsets = variable_width_data.offsets.borrow_to_typed_slice::<u64>();
348                        let offsets = offsets.as_ref();
349                        // The first 8 bytes store the bits per offset, the next 8 bytes store the start
350                        // offset of the bytes data, then offsets data, then bytes data.
351                        let bytes_start_offset = 8 + 8 + std::mem::size_of_val(offsets) as u64;
352
353                        let output_total_bytes =
354                            bytes_start_offset as usize + variable_width_data.data.len();
355                        let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
356
357                        // Store bit_per_offset info
358                        output.extend_from_slice(&(64_u64).to_le_bytes());
359
360                        // store `bytes_start_offset` in the next 8 bytes of output buffer
361                        output.extend_from_slice(&(bytes_start_offset).to_le_bytes());
362
363                        // store offsets
364                        output.extend_from_slice(&variable_width_data.offsets);
365
366                        // store bytes
367                        output.extend_from_slice(&variable_width_data.data);
368                        Ok(LanceBuffer::from(output))
369                    }
370                    _ => {
371                        panic!("BinaryBlockEncoder does not work with {} bits per offset VariableWidth DataBlock.",
372                variable_width_data.bits_per_offset);
373                    }
374                }
375            }
376            _ => {
377                panic!("BinaryBlockEncoder can only work with Variable Width DataBlock.");
378            }
379        }
380    }
381}
382
383impl PerValueCompressor for VariableEncoder {
384    fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
385        let DataBlock::VariableWidth(variable) = data else {
386            panic!("BinaryPerValueCompressor can only work with Variable Width DataBlock.");
387        };
388
389        let encoding = ProtobufUtils21::variable(
390            ProtobufUtils21::flat(variable.bits_per_offset as u64, None),
391            None,
392        );
393        Ok((PerValueDataBlock::Variable(variable), encoding))
394    }
395}
396
397#[derive(Debug, Default)]
398pub struct VariableDecoder {}
399
400impl VariablePerValueDecompressor for VariableDecoder {
401    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
402        Ok(DataBlock::VariableWidth(data))
403    }
404}
405
406#[derive(Debug, Default)]
407pub struct BinaryBlockDecompressor {}
408
409impl BlockDecompressor for BinaryBlockDecompressor {
410    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
411        // In older (not quite stable) versions we stored the bits per offset as a single byte and then the num_values
412        // as four bytes.  However, this led to alignment problems and was wasteful since we already store the num_values
413        // in higher layers.
414        //
415        // In the standard scheme we use 4 bytes for the bits per offset and 4 bytes for the bytes_start_offset and we
416        // rely on the passed in num_values to be correct.
417
418        // This isn't perfect but it's probably good enough and the best I think we can do.  The bits per offset will
419        // never be more than 255 and it's little endian so the last 3 bytes will always be 0.  These will be the least
420        // significant 3 bytes of the number of values in the old scheme.  It's pretty unlikely these are all 0 (that would
421        // mean there are at least 16M values in a single page) so we'll use this to determine if the old scheme is used.
422        let is_old_scheme = data[1] != 0 || data[2] != 0 || data[3] != 0;
423
424        let (bits_per_offset, bytes_start_offset, offset_start) = if is_old_scheme {
425            // Old scheme
426            let bits_per_offset = data[0];
427            match bits_per_offset {
428                32 => {
429                    debug_assert_eq!(LittleEndian::read_u32(&data[1..5]), num_values as u32);
430                    let bytes_start_offset = LittleEndian::read_u32(&data[5..9]);
431                    (bits_per_offset, bytes_start_offset as u64, 9)
432                }
433                64 => {
434                    debug_assert_eq!(LittleEndian::read_u64(&data[1..9]), num_values);
435                    let bytes_start_offset = LittleEndian::read_u64(&data[9..17]);
436                    (bits_per_offset, bytes_start_offset, 17)
437                }
438                _ => {
439                    return Err(Error::InvalidInput {
440                        source: format!("Unsupported bits_per_offset={}", bits_per_offset).into(),
441                        location: location!(),
442                    });
443                }
444            }
445        } else {
446            // Standard scheme
447            let bits_per_offset = LittleEndian::read_u32(&data[0..4]) as u8;
448            match bits_per_offset {
449                32 => {
450                    let bytes_start_offset = LittleEndian::read_u32(&data[4..8]);
451                    (bits_per_offset, bytes_start_offset as u64, 8)
452                }
453                64 => {
454                    let bytes_start_offset = LittleEndian::read_u64(&data[8..16]);
455                    (bits_per_offset, bytes_start_offset, 16)
456                }
457                _ => {
458                    return Err(Error::InvalidInput {
459                        source: format!("Unsupported bits_per_offset={}", bits_per_offset).into(),
460                        location: location!(),
461                    });
462                }
463            }
464        };
465
466        // the next `bytes_start_offset - offset_start` stores the offsets.
467        let offsets =
468            data.slice_with_length(offset_start, bytes_start_offset as usize - offset_start);
469
470        // the rest are the binary bytes.
471        let data = data.slice_with_length(
472            bytes_start_offset as usize,
473            data.len() - bytes_start_offset as usize,
474        );
475
476        Ok(DataBlock::VariableWidth(VariableWidthBlock {
477            data,
478            offsets,
479            bits_per_offset,
480            num_values,
481            block_info: BlockInfo::new(),
482        }))
483    }
484}
485
486#[cfg(test)]
487pub mod tests {
488    use arrow_array::{
489        builder::{LargeStringBuilder, StringBuilder},
490        ArrayRef, StringArray,
491    };
492    use arrow_schema::{DataType, Field};
493
494    use crate::{
495        constants::{
496            COMPRESSION_META_KEY, STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY,
497            STRUCTURAL_ENCODING_MINIBLOCK,
498        },
499        testing::check_specific_random,
500    };
501    use rstest::rstest;
502    use std::{collections::HashMap, sync::Arc, vec};
503
504    use crate::{
505        testing::{
506            check_basic_random, check_round_trip_encoding_of_data, FnArrayGeneratorProvider,
507            TestCases,
508        },
509        version::LanceFileVersion,
510    };
511
512    #[test_log::test(tokio::test)]
513    async fn test_utf8_binary() {
514        let field = Field::new("", DataType::Utf8, false);
515        check_specific_random(
516            field,
517            TestCases::basic().with_min_file_version(LanceFileVersion::V2_1),
518        )
519        .await;
520    }
521
522    #[rstest]
523    #[test_log::test(tokio::test)]
524    async fn test_binary(
525        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
526        structural_encoding: &str,
527        #[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
528    ) {
529        let mut field_metadata = HashMap::new();
530        field_metadata.insert(
531            STRUCTURAL_ENCODING_META_KEY.to_string(),
532            structural_encoding.into(),
533        );
534
535        let field = Field::new("", data_type, false).with_metadata(field_metadata);
536        check_basic_random(field).await;
537    }
538
539    #[rstest]
540    #[test_log::test(tokio::test)]
541    async fn test_binary_fsst(
542        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
543        structural_encoding: &str,
544        #[values(DataType::Binary, DataType::Utf8)] data_type: DataType,
545    ) {
546        let mut field_metadata = HashMap::new();
547        field_metadata.insert(
548            STRUCTURAL_ENCODING_META_KEY.to_string(),
549            structural_encoding.into(),
550        );
551        field_metadata.insert(COMPRESSION_META_KEY.to_string(), "fsst".into());
552        let field = Field::new("", data_type, true).with_metadata(field_metadata);
553        // TODO (https://github.com/lancedb/lance/issues/4783)
554        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
555        check_specific_random(field, test_cases).await;
556    }
557
558    #[rstest]
559    #[test_log::test(tokio::test)]
560    async fn test_fsst_large_binary(
561        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
562        structural_encoding: &str,
563        #[values(DataType::LargeBinary, DataType::LargeUtf8)] data_type: DataType,
564    ) {
565        let mut field_metadata = HashMap::new();
566        field_metadata.insert(
567            STRUCTURAL_ENCODING_META_KEY.to_string(),
568            structural_encoding.into(),
569        );
570        field_metadata.insert(COMPRESSION_META_KEY.to_string(), "fsst".into());
571        let field = Field::new("", data_type, true).with_metadata(field_metadata);
572        check_specific_random(
573            field,
574            TestCases::basic().with_min_file_version(LanceFileVersion::V2_1),
575        )
576        .await;
577    }
578
579    #[test_log::test(tokio::test)]
580    async fn test_large_binary() {
581        let field = Field::new("", DataType::LargeBinary, true);
582        check_basic_random(field).await;
583    }
584
585    #[test_log::test(tokio::test)]
586    async fn test_large_utf8() {
587        let field = Field::new("", DataType::LargeUtf8, true);
588        check_basic_random(field).await;
589    }
590
591    #[rstest]
592    #[test_log::test(tokio::test)]
593    async fn test_small_strings(
594        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
595        structural_encoding: &str,
596    ) {
597        use crate::testing::check_basic_generated;
598
599        let mut field_metadata = HashMap::new();
600        field_metadata.insert(
601            STRUCTURAL_ENCODING_META_KEY.to_string(),
602            structural_encoding.into(),
603        );
604        let field = Field::new("", DataType::Utf8, true).with_metadata(field_metadata);
605        check_basic_generated(
606            field,
607            Box::new(FnArrayGeneratorProvider::new(move || {
608                lance_datagen::array::utf8_prefix_plus_counter("user_", /*is_large=*/ false)
609            })),
610        )
611        .await;
612    }
613
614    #[rstest]
615    #[test_log::test(tokio::test)]
616    async fn test_simple_binary(
617        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
618        structural_encoding: &str,
619        #[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
620    ) {
621        let string_array = StringArray::from(vec![Some("abc"), None, Some("pqr"), None, Some("m")]);
622        let string_array = arrow_cast::cast(&string_array, &data_type).unwrap();
623
624        let mut field_metadata = HashMap::new();
625        field_metadata.insert(
626            STRUCTURAL_ENCODING_META_KEY.to_string(),
627            structural_encoding.into(),
628        );
629
630        let test_cases = TestCases::default()
631            .with_range(0..2)
632            .with_range(0..3)
633            .with_range(1..3)
634            .with_indices(vec![0, 1, 3, 4]);
635        check_round_trip_encoding_of_data(
636            vec![Arc::new(string_array)],
637            &test_cases,
638            field_metadata,
639        )
640        .await;
641    }
642
643    #[test_log::test(tokio::test)]
644    async fn test_sliced_utf8() {
645        let string_array = StringArray::from(vec![Some("abc"), Some("de"), None, Some("fgh")]);
646        let string_array = string_array.slice(1, 3);
647
648        let test_cases = TestCases::default()
649            .with_range(0..1)
650            .with_range(0..2)
651            .with_range(1..2);
652        check_round_trip_encoding_of_data(
653            vec![Arc::new(string_array)],
654            &test_cases,
655            HashMap::new(),
656        )
657        .await;
658    }
659
660    #[test_log::test(tokio::test)]
661    async fn test_bigger_than_max_page_size() {
662        // Create an array with one single 32MiB string
663        let big_string = String::from_iter((0..(32 * 1024 * 1024)).map(|_| '0'));
664        let string_array = StringArray::from(vec![
665            Some(big_string),
666            Some("abc".to_string()),
667            None,
668            None,
669            Some("xyz".to_string()),
670        ]);
671
672        // Drop the max page size to 1MiB
673        let test_cases = TestCases::default().with_max_page_size(1024 * 1024);
674
675        check_round_trip_encoding_of_data(
676            vec![Arc::new(string_array)],
677            &test_cases,
678            HashMap::new(),
679        )
680        .await;
681
682        // This is a regression testing the case where a page with X rows is split into Y parts
683        // where the number of parts is not evenly divisible by the number of rows.  In this
684        // case we are splitting 90 rows into 4 parts.
685        let big_string = String::from_iter((0..(1000 * 1000)).map(|_| '0'));
686        let string_array = StringArray::from_iter_values((0..90).map(|_| big_string.clone()));
687
688        check_round_trip_encoding_of_data(
689            vec![Arc::new(string_array)],
690            &TestCases::default(),
691            HashMap::new(),
692        )
693        .await;
694    }
695
696    #[test_log::test(tokio::test)]
697    async fn test_empty_strings() {
698        // Scenario 1: Some strings are empty
699
700        let values = [Some("abc"), Some(""), None];
701        // Test empty list at beginning, middle, and end
702        for order in [[0, 1, 2], [1, 0, 2], [2, 0, 1]] {
703            let mut string_builder = StringBuilder::new();
704            for idx in order {
705                string_builder.append_option(values[idx]);
706            }
707            let string_array = Arc::new(string_builder.finish());
708            let test_cases = TestCases::default()
709                .with_indices(vec![1])
710                .with_indices(vec![0])
711                .with_indices(vec![2])
712                .with_indices(vec![0, 1]);
713            check_round_trip_encoding_of_data(
714                vec![string_array.clone()],
715                &test_cases,
716                HashMap::new(),
717            )
718            .await;
719            let test_cases = test_cases.with_batch_size(1);
720            check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new())
721                .await;
722        }
723
724        // Scenario 2: All strings are empty
725
726        // When encoding an array of empty strings there are no bytes to encode
727        // which is strange and we want to ensure we handle it
728        let string_array = Arc::new(StringArray::from(vec![Some(""), None, Some("")]));
729
730        let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
731        check_round_trip_encoding_of_data(vec![string_array.clone()], &test_cases, HashMap::new())
732            .await;
733        let test_cases = test_cases.with_batch_size(1);
734        check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
735    }
736
737    #[test_log::test(tokio::test)]
738    #[ignore] // This test is quite slow in debug mode
739    async fn test_jumbo_string() {
740        // This is an overflow test.  We have a list of lists where each list
741        // has 1Mi items.  We encode 5000 of these lists and so we have over 4Gi in the
742        // offsets range
743        let mut string_builder = LargeStringBuilder::new();
744        // a 1 MiB string
745        let giant_string = String::from_iter((0..(1024 * 1024)).map(|_| '0'));
746        for _ in 0..5000 {
747            string_builder.append_option(Some(&giant_string));
748        }
749        let giant_array = Arc::new(string_builder.finish()) as ArrayRef;
750        let arrs = vec![giant_array];
751
752        // // We can't validate because our validation relies on concatenating all input arrays
753        let test_cases = TestCases::default().without_validation();
754        check_round_trip_encoding_of_data(arrs, &test_cases, HashMap::new()).await;
755    }
756
757    #[rstest]
758    #[test_log::test(tokio::test)]
759    async fn test_binary_dictionary_encoding(
760        #[values(true, false)] with_nulls: bool,
761        #[values(100, 500, 35000)] dict_size: u32,
762    ) {
763        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
764        let strings = (0..dict_size)
765            .map(|i| i.to_string())
766            .collect::<Vec<String>>();
767
768        let repeated_strings: Vec<_> = strings
769            .iter()
770            .cycle()
771            .take(70000)
772            .enumerate()
773            .map(|(i, s)| {
774                if with_nulls && i % 7 == 0 {
775                    None
776                } else {
777                    Some(s.clone())
778                }
779            })
780            .collect();
781        let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
782        check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
783    }
784
785    #[test_log::test(tokio::test)]
786    async fn test_binary_encoding_verification() {
787        use lance_datagen::{ByteCount, RowCount};
788
789        let test_cases = TestCases::default()
790            .with_expected_encoding("variable")
791            .with_min_file_version(LanceFileVersion::V2_1);
792
793        // Test both automatic selection and explicit configuration
794        // 1. Test automatic binary encoding selection (small strings that won't trigger FSST)
795        let arr_small = lance_datagen::gen_batch()
796            .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(10), false))
797            .into_batch_rows(RowCount::from(1000))
798            .unwrap()
799            .column(0)
800            .clone();
801        check_round_trip_encoding_of_data(vec![arr_small], &test_cases, HashMap::new()).await;
802
803        // 2. Test explicit "none" compression to force binary encoding
804        let metadata_explicit =
805            HashMap::from([("lance-encoding:compression".to_string(), "none".to_string())]);
806        let arr_large = lance_datagen::gen_batch()
807            .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(50), false))
808            .into_batch_rows(RowCount::from(2000))
809            .unwrap()
810            .column(0)
811            .clone();
812        check_round_trip_encoding_of_data(vec![arr_large], &test_cases, metadata_explicit).await;
813    }
814
815    #[test]
816    fn test_binary_miniblock_with_misaligned_buffer() {
817        use super::BinaryMiniBlockDecompressor;
818        use crate::buffer::LanceBuffer;
819        use crate::compression::MiniBlockDecompressor;
820        use crate::data::DataBlock;
821
822        // Test case 1: u32 offsets
823        {
824            let decompressor = BinaryMiniBlockDecompressor {
825                bits_per_offset: 32,
826            };
827
828            // Create test data with u32 offsets
829            // BinaryMiniBlock format: all offsets followed by all string data
830            // Need to ensure total size is divisible by 4 for u32
831            let mut test_data = Vec::new();
832
833            // Offsets section (3 offsets for 2 values + 1 end offset)
834            test_data.extend_from_slice(&12u32.to_le_bytes()); // offset to start of strings (after offsets)
835            test_data.extend_from_slice(&15u32.to_le_bytes()); // offset to second string
836            test_data.extend_from_slice(&20u32.to_le_bytes()); // offset to end
837
838            // String data section
839            test_data.extend_from_slice(b"ABCXYZ"); // 6 bytes of string data
840            test_data.extend_from_slice(&[0, 0]); // 2 bytes padding to make total 20 bytes (divisible by 4)
841
842            // Create a misaligned buffer by adding padding and slicing
843            let mut padded = Vec::with_capacity(test_data.len() + 1);
844            padded.push(0xFF); // Padding byte to misalign
845            padded.extend_from_slice(&test_data);
846
847            let bytes = bytes::Bytes::from(padded);
848            let misaligned = bytes.slice(1..); // Skip first byte to create misalignment
849
850            // Create LanceBuffer with bytes_per_value=1 to bypass alignment check
851            let buffer = LanceBuffer::from_bytes(misaligned, 1);
852
853            // Verify the buffer is actually misaligned
854            let ptr = buffer.as_ref().as_ptr();
855            assert_ne!(
856                ptr.align_offset(4),
857                0,
858                "Test setup: buffer should be misaligned for u32"
859            );
860
861            // Decompress with misaligned buffer - should work with borrow_to_typed_slice
862            let result = decompressor.decompress(vec![buffer], 2);
863            assert!(
864                result.is_ok(),
865                "Decompression should succeed with misaligned buffer"
866            );
867
868            // Verify the data is correct
869            if let Ok(DataBlock::VariableWidth(block)) = result {
870                assert_eq!(block.num_values, 2);
871                // Data should be the strings (including padding from the original buffer)
872                assert_eq!(&block.data.as_ref()[..6], b"ABCXYZ");
873            } else {
874                panic!("Expected VariableWidth block");
875            }
876        }
877
878        // Test case 2: u64 offsets
879        {
880            let decompressor = BinaryMiniBlockDecompressor {
881                bits_per_offset: 64,
882            };
883
884            // Create test data with u64 offsets
885            let mut test_data = Vec::new();
886
887            // Offsets section (3 offsets for 2 values + 1 end offset)
888            test_data.extend_from_slice(&24u64.to_le_bytes()); // offset to start of strings (after offsets)
889            test_data.extend_from_slice(&29u64.to_le_bytes()); // offset to second string
890            test_data.extend_from_slice(&40u64.to_le_bytes()); // offset to end (divisible by 8)
891
892            // String data section
893            test_data.extend_from_slice(b"HelloWorld"); // 10 bytes of string data
894            test_data.extend_from_slice(&[0, 0, 0, 0, 0, 0]); // 6 bytes padding to make total 40 bytes (divisible by 8)
895
896            // Create misaligned buffer
897            let mut padded = Vec::with_capacity(test_data.len() + 3);
898            padded.extend_from_slice(&[0xFF, 0xFF, 0xFF]); // 3 bytes padding for misalignment
899            padded.extend_from_slice(&test_data);
900
901            let bytes = bytes::Bytes::from(padded);
902            let misaligned = bytes.slice(3..); // Skip 3 bytes
903
904            let buffer = LanceBuffer::from_bytes(misaligned, 1);
905
906            // Verify misalignment for u64
907            let ptr = buffer.as_ref().as_ptr();
908            assert_ne!(
909                ptr.align_offset(8),
910                0,
911                "Test setup: buffer should be misaligned for u64"
912            );
913
914            // Decompress should succeed
915            let result = decompressor.decompress(vec![buffer], 2);
916            assert!(
917                result.is_ok(),
918                "Decompression should succeed with misaligned u64 buffer"
919            );
920
921            if let Ok(DataBlock::VariableWidth(block)) = result {
922                assert_eq!(block.num_values, 2);
923                // Data should be the strings (including padding from the original buffer)
924                assert_eq!(&block.data.as_ref()[..10], b"HelloWorld");
925            } else {
926                panic!("Expected VariableWidth block");
927            }
928        }
929    }
930}